我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:admin@137x.com
BIO(background I/O service)
线程来异步处理aof持久化、关闭文件的任务,在Redis v4.0版本中又添加了一个BIO线程,将比较耗时的命令异步化,到了Redis v6.0,Redis核心的网络模型也被改造成了多线程。1. Redis基础架构设计
I/O multiplexing
实现了单线程接收海量客户端请求;通过单线程Reactor
模型实现了高性能的事件处理;基于条件变量实现的生产者消费者模型构建了自己的BIO系统
。本节先简单介绍一下这些从Redis诞生以来就一直使用的基础架构设计。Redis的I/O多路复用模块提供的API:
//下面的方法不同版本的redis在src目录下的ae_epoll.c、ae_evport.c、ae_kqueue.c、ae_select.c代码文件中都有实现
static int aeApiCreate(aeEventLoop *eventLoop)
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
static void aeApiFree(aeEventLoop *eventLoop)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
Redis可以在多个平台上运行,所以会通过宏定义,根据编译平台的不同,选择不同的I/O多路复用函数作为子模块,提供给上层接口做封装。
/*下面代码在Redis不同版本的ae.c源码文件中均包含
*Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
I/O 多路复用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O)
的模式。通常设置一个主线程负责做 event-loop 事件循环和 I/O 读写,通过 select/poll/epoll_wait 等系统调用监听 I/O 事件,业务逻辑提交给其他工作线程去做。而所谓『非阻塞 I/O』的核心思想是指避免阻塞在 read() 或者 write() 或者其他的 I/O 系统调用上,这样可以最大限度的复用 event-loop 线程,让一个线程能服务于多个 sockets。在 Reactor 模式中,I/O 线程只能阻塞在 I/O multiplexing 函数上(select/poll/epoll_wait)。单线程的Reactor网络模型是这样的:pthread_mutex_init(pthread_mutex_t mutex,const pthread_mutexattr_t attr)
pthread_mutex_lock(pthread_mutex_t *mutex)
pthread_mutex_trylock(pthread_mutex_t *mutex)
pthread_mutex_unlock(pthread_mutex_t *mutex)
pthread_mutex_destroy(pthread_mutex_t *mutex)
pthread_mutex_t mutex;
pthread_mutex_init(&mutex,NULL);
int pthread_cond_init(pthread_cond_t cond, pthread_condattr_t cond_attr)
int pthread_cond_wait(pthread_cond_t cond, pthread_mutex_t mutex)
int pthread_cond_signal(pthread_cond_t *cond)
int pthread_cond_broadcast(pthread_cond_t *cond)
int pthread_cond_timedwait(pthread_cond_t cond, pthread_mutex_t mutex, const struct timespec *abstime)
线程1
pthread_mutex_lock(&mutex);
while (condition == FALSE) {
pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);
线程2
condition = TRUE;
pthread_cond_signal(&cond);
线程1 线程2
pthread_mutex_lock(&mutex);
while (condition == FALSE)
condition = TRUE;
pthread_cond_signal(&cond);
pthread_cond_wait(&cond, &mutex);
线程1 线程2
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
while (condition == FALSE) { condition = TRUE;
pthread_cond_wait(&cond, &mutex); pthread_cond_signal(&cond);
} pthread_mutex_unlock(&mutex);
pthread_mutex_unlock(&mutex);
2. Redis核心网络模型架构的演进
client
:客户端对象,Redis 是典型的 CS 架构(Client <---> Server),客户端通过 socket 与服务端建立网络通道然后发送请求命令,服务端执行请求的命令并回复。Redis 使用结构体 client 存储客户端的所有相关信息,包括但不限于封装的套接字连接 *conn,当前选择的数据库指针 *db,读入缓冲区 querybuf,写出缓冲区 buf,写出数据链表 reply等。/* client对象在redis不同版本的redis.h中均有定义
* With multiplexing we need to take per-client state.
* Clients are taken in a linked list. */
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
int fd; /* Client socket. */
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the
replication stream that we are receiving from
the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
...... //此处省略很多其它的属性
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
aeApiPoll
:这个我们前边提到过,I/O 多路复用 API,是基于 epoll_wait/select/kevent 等系统调用的封装,监听等待读写事件触发,然后处理,它是事件循环(Event Loop)中的核心函数,是事件驱动得以运行的基础。acceptTcpHandler
:连接应答处理器,底层使用系统调用 accept 接受来自客户端的新连接,并为新连接注册绑定命令读取处理器,以备后续处理新的客户端 TCP 连接;除了这个处理器,还有对应的 acceptUnixHandler 负责处理 Unix Domain Socket 以及 acceptTLSHandler 负责处理 TLS 加密连接。readQueryFromClient
:命令读取处理器,解析并执行客户端的请求命令。beforeSleep
:事件循环中进入 aeApiPoll 等待事件到来之前会执行的函数,其中包含一些日常的任务,比如把 client->buf 或者 client->reply (后面会解释为什么这里需要两个缓冲区)中的响应写回到客户端,持久化 AOF 缓冲区的数据到磁盘等,相对应的还有一个 afterSleep 函数,在 aeApiPoll 之后执行。sendReplyToClient
:命令回复处理器,当一次事件循环之后写出缓冲区中还有数据残留,则这个处理器会被注册绑定到相应的连接上,等连接触发写就绪事件时,它会将写出缓冲区剩余的数据回写到客户端。Redis
官方给出的FAQRedis
性能表现的瓶颈所在,更多情况下是受到内存大小和网络I/O的限制,所以Redis核心网络模型使用单线程并没有什么问题,如果你想要使用服务的多核CPU,可以在一台服务器上启动多个实例或者采用分片集群的方式。/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
//如果只配置了一个线程,则所有的I/O放到主线程上执行
if (server.io_threads_num == 1) return;
//最多配置的线程数量不超过128
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
//启动线程,线程数量为配置中的线程数
for (int i = 0; i < server.io_threads_num; i++) {
//创建I/O线程的本地任务队列
io_threads_list[i] = listCreate();
if (i == 0) continue; //0号线程是主线程
//初始化 I/O 线程并启动。
pthread_t tid;
// 每个 I/O 线程会分配一个本地锁,用来休眠和唤醒线程。
pthread_mutex_init(&io_threads_mutex[i],NULL);
// 每个 I/O 线程分配一个原子计数器,用来记录当前遗留的任务数量。
io_threads_pending[i] = 0;
// 主线程在启动 I/O 线程的时候会默认先锁住它,直到有 I/O 任务才唤醒它。
pthread_mutex_lock(&io_threads_mutex[i]);
// 启动线程,进入 I/O 线程的主逻辑函数 IOThreadMain
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
initThreadedIO
会在Redis服务器启动的末尾被调用,进行I/O多线程的初始化工作。Redis的多线程模式是关闭的,需要用户在 redis.conf 配置文件中开启并设置:
io-threads 2
io-threads-do-reads yes
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
//如果开启了多线程,将client加入到异步队列之后返回
if (postponeClientRead(c)) return;
...... //省略代码,逻辑同单线程版本几乎一样
}
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
//当多线程 I/O 模式开启、主线程没有在处理阻塞任务时,将 client 加入异步队列。
if (io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
// 给 client 打上 CLIENT_PENDING_READ 标识,表示该 client 需要被多线程处理,
// 后续在 I/O 线程中会在读取和解析完客户端命令之后判断该标识并放弃执行命令,让主线程去执行。
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
主线程会在事件循环的 beforeSleep() 方法中,调用 handleClientsWithPendingReadsUsingThreads:
/* When threaded I/O is also enabled for the reading + parsing side, the
* readable handler will just put normal clients into a queue of clients to
* process (instead of serving them synchronously). This function runs
* the queue using the I/O threads, and process them in order to accumulate
* the reads in the buffers, and also parse the first command available
* rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
// 遍历待读取的 client 队列 clients_pending_read,
// 通过 RR 轮询均匀地分配给 I/O 线程和主线程自己(编号 0)。
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 设置当前 I/O 操作为读取操作,给每个 I/O 线程的计数器设置分配的任务数量,
// 让 I/O 线程可以开始工作:只读取和解析命令,不执行。
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// 忙轮询,累加所有 I/O 线程的原子任务计数器,直到所有计数器的遗留任务数量都是 0,
// 表示所有任务都已经执行完成,结束轮询。
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
// 遍历待读取的 client 队列,清除 CLIENT_PENDING_READ 和 CLIENT_PENDING_COMMAND 标记,
// 然后解析并执行所有 client 的命令。
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
// client 的第一条命令已经被解析好了,直接尝试执行。
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
}
// 继续解析并执行 client 命令。
processInputBuffer(c);
}
return processed;
}
这里主线程所做的核心工作就是:
遍历待读取的 client 队列 clients_pending_read,通过 RR 策略把所有任务分配给 I/O 线程和主线程去读取和解析客户端命令。
忙轮询等待所有 I/O 线程完成任务。
等主线程和其它I/O吓成一起完成了读请求处理以后,最后再遍历 clients_pending_read,执行所有 client 的命令。
与读取相求相对应的自然是写回响应了,完成命令的读取、解析以及执行之后,客户端命令的响应数据已经存入 client->buf 或者 client->reply 中了,接下来就需要把响应数据回写到客户端了,还是在 beforeSleep 中, 主线程调用 handleClientsWithPendingWritesUsingThreads:
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
// 如果用户设置的 I/O 线程数等于 1 或者当前 clients_pending_write 队列中待写出的 client
// 数量不足 I/O 线程数的两倍,则不用多线程的逻辑,让所有 I/O 线程进入休眠,
// 直接在主线程把所有 client 的相应数据回写到客户端。
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
// 唤醒正在休眠的 I/O 线程(如果有的话)。
if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
// 遍历待写出的 client 队列 clients_pending_write,
// 通过 RR 轮询均匀地分配给 I/O 线程和主线程自己(0号线程)。
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 设置当前 I/O 操作为写出操作,给每个 I/O 线程的计数器设置分配的任务数量,
// 让 I/O 线程可以开始工作,把写出缓冲区(client->buf 或 c->reply)中的响应数据回写到客户端。
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
// 忙轮询,累加所有 I/O 线程的原子任务计数器,直到所有计数器的遗留任务数量都是 0。
// 表示所有任务都已经执行完成,结束轮询。
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
// 最后再遍历一次 clients_pending_write 队列,检查是否还有 client 的中写出缓冲区中有残留数据,
// 如果有,那就为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 检查 client 的写出缓冲区是否还有遗留数据。
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
while(1) {
// 忙轮询,100w 次循环,等待主线程分配 I/O 任务。
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
// 如果 100w 次忙轮询之后如果还是没有任务分配给它,则通过尝试加锁进入休眠,
// 等待主线程分配任务之后调用 startThreadedIO 解锁,唤醒 I/O 线程去执行。
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
// 注意:主线程分配任务给 I/O 线程之时,
// 会把任务加入每个线程的本地任务队列 io_threads_list[id],
// 但是当 I/O 线程开始执行任务之后,主线程就不会再去访问这些任务队列,避免数据竞争。
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 如果当前是写出操作,则把 client 的写出缓冲区中的数据回写到客户端。
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
// 如果当前是读取操作,则socket 读取客户端的请求命令并解析第一条命令。
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
// 所有任务执行完之后把自己的计数器置 0,主线程通过累加所有 I/O 线程的计数器
// 判断是否所有 I/O 线程都已经完成工作。
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
Redis
通过多线程来提升性能达到的效果可能比你想象中的还要好,下边是从ITNEXT平台提取的一张压测结果报告图,更详细的内容可参考:对实验性Redis多线程I / O进行基准测试Redis
能够达到20W+的QPS,相比较于单线程,性能提升了将近一倍。3. Redis BIO系统的演进
本文一开始就给出了结论:对于整个服务端架构的设计而言,Redis从来都不是单线程的。我们讨论的单线程,只是针对Redis的核心网络模型而言,这部分的内容我们在上一节已经深入剖析过了。对于Redis的BIO系统,也就是这些所谓的后台线程,也经历了一个演进的过程,在这个过程中都发生了哪些事?Redis的作者antirez对BIO系统做了哪些优化呢?本节我们就来一起探索下这两个问题。
static pthread_t bio_threads[BIO_NUM_OPS];
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
static pthread_cond_t bio_condvar[BIO_NUM_OPS];
static list *bio_jobs[BIO_NUM_OPS];
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_condvar[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}//初始化锁与条件变量
......
......
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
//这里的函数参数是arg = j,也就是每个线程传入一个编号j,0代表关闭文件,1代表aof初始化
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}//初始化线程
struct bio_job {
time_t time;
void *arg1, *arg2, *arg3;
};
可以看到,任务不是很复杂,只记录一个时间和参数就可以了,后面讲任务执行的时候,会讲到这样一个简单的结构记录的任务怎么执行。任务入队列的代码如下:
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->arg1 = arg1;
...
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_condvar[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}
void *bioProcessBackgroundJobs(void *arg) {
unsigned long type = (unsigned long) arg;
struct bio_job *job;
while(1) {
listNode *ln;
pthread_mutex_lock(&bio_mutex[type]);
if (listLength(bio_jobs[type]) == 0) {
//条件不成立,等待
pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
//被通知以后,停止阻塞,重新判断条件
continue;
}
//条件成立,直接执行
ln = listFirst(bio_jobs[type]);
job = ln->value;
//取走值以后,解锁
pthread_mutex_unlock(&bio_mutex[type]);
//完成队列处理以后,根据类型调用close函数或者fsync函数。
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
fsync((long)job->arg1);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
......
......
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
void aof_background_fsync(int fd) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}
为什么说渐进式懒删除很难做呢?作者说当我们删除一个集合的时候,可能我们删除集合中元素的速度还没有客户端向集合中添加元素的速度快,那我们的删除工作看起来是永远也无法完成了。
上一小节我们分析了使用DEL命令删除大key时会造成redis服务端阻塞,除此之外,在使用 FLUSHDB 和 FLUSHALL 删除包含大量键的数据库时,或者redis在清理过期数据和淘汰内存超限的数据时,如果碰巧撞到了大体积的键也会造成服务器阻塞。
为了解决以上问题, redis 4.0 引入了lazyfree的机制,它可以将删除键或数据库的操作放在后台线程里执行, 从而尽可能地避免服务器阻塞。
lazyfree的原理不难想象,就是在删除对象时只是进行逻辑删除,然后把对象丢给后台,让后台线程去执行真正的destruct,避免由于对象体积过大而造成阻塞。
void unlinkCommand(client *c) {
delGenericCommand(c, 1);
}
入口很简单,就是调用delGenericCommand,第二个参数为1表示需要异步删除。
/* This command implements DEL and LAZYDEL. */
void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;
for (j = 1; j < c->argc; j++) {
expireIfNeeded(c->db,c->argv[j]);
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
signalModifiedKey(c->db,c->argv[j]);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++;
numdel++;
}
}
addReplyLongLong(c,numdel);
}
delGenericCommand函数根据lazy参数来决定是同步删除还是异步删除,同步删除的逻辑没有什么变化就不细讲了,我们重点看下新增的异步删除的实现。
#define LAZYFREE_THRESHOLD 64
// 首先定义了启用后台删除的阈值,对象中的元素大于该阈值时才真正丢给后台线程去删除,如果对象中包含的元素太少就没有必要丢给后台线程,因为线程同步也要一定的消耗。
int dbAsyncDelete(redisDb *db, robj *key) {
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
//清除待删除key的过期时间
dictEntry *de = dictUnlink(db->dict,key->ptr);
//dictUnlink返回数据库字典中包含key的条目指针,并从数据库字典中摘除该条目(并不会释放资源)
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);
//lazyfreeGetFreeEffort来获取val对象所包含的元素个数
if (free_effort > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects,1);
//原子操作给lazyfree_objects加1,以备info命令查看有多少对象待后台线程删除
bioCreateBackgroundJob(BIO_LAZY_FREE ,val,NULL,NULL);
//此时真正把对象val丢到后台线程的任务队列中
dictSetVal(db->dict,de,NULL);
//把条目里的val指针设置为NULL,防止删除数据库字典条目时重复删除val对象
}
}
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
//删除数据库字典条目,释放资源
return 1;
} else {
return 0;
}
}
4. 小结
redis-server
:redis的主线程,通过 I/O multiplexing 接收客户端请求,执行CPU指令,从redis发版之出就已经存在并一直作为Redis核心网络模型的主线程。bio_close_file、bio_aof_fsync
:Redis BIO系统的老成员,bio_close_file用来异步的关闭服务端文件,bio_aof_fsync用来异步的同步日志,它们也是从Redis发版之出就一直存在了。bio_lazy_free
:Redis在4.0版本BIO系统增加的新成员,它主要将删除键或数据库的操作放在后台线程里执行, 从而尽可能地避免服务器阻塞。io_thd_1、io_thd_2、io_thd_3
:Redis在6.0版本提供了I/O工作线程的配置,它们用来分担Redis主线程的I/O压力,使主线程更高效的进行CPU指令执行的工作。参考 & 延伸阅读
Lazy Redis is better Redis :antirez.com/news/93
The C10K problem :www.kegel.com/c10k.html
redis FAQ :redis.io/topics/faq
Benchmarking the experimental Redis Multi-Threaded I/O :itnext.io/benchmarkin…
Redis和I/O 多路复用 :draveness.me/redis-io-mu…
Redis 多线程网络模型全面揭秘 :mp.weixin.qq.com/s/pm2NsPzTO…
条件变量与锁 :yiwenshao.github.io/2016/11/05/…
Redis 6.0 多线程 IO 处理过程详解 :ruby-china.org/topics/3992…
Redis · lazyfree · 大key删除的福音 :mysql.taobao.org/monthly/201…
如喜欢本文,请点击右上角,把文章分享到朋友圈
如有想了解学习的技术点,请留言给若飞安排分享
因公众号更改推送规则,请点“在看”并加“星标”第一时间获取精彩技术分享
·END·
相关阅读:
作者:绘你一世倾城
来源:juejin.cn/post/6949929673615179789
版权申明:内容来源网络,仅供分享学习,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢!
我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:admin@137x.com